package com.trytech.mongoocrawler.server;

import com.trytech.mongoocrawler.common.transport.protocol.HtmltextProtocol;
import com.trytech.mongoocrawler.server.common.enums.LifeCycle;
import com.trytech.mongoocrawler.server.common.queue.DisruptorContext;
import com.trytech.mongoocrawler.server.transport.*;
import com.trytech.mongoocrawler.server.transport.http.CrawlerHttpRequest;
import com.trytech.mongoocrawler.server.transport.http.factory.CrawlerHttpReqFactory;
import com.trytech.mongoocrawler.server.xml.CrawlerXmlConfigBean;

import java.net.MalformedURLException;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 一次爬取会话
 * 会话的生命周期包括：
 * INITIALIZED=0-初始化
 * RUNNING=1-已运行
 * PAUSED=2-暂停
 * DESTORYED=3-已销毁
 */
public final class DistributedCrawlerSession extends CrawlerSession {
    //工作线程池
    private final static TaskThreadPool threadPool = new TaskThreadPool();
    //crawler的配置
    protected CrawlerXmlConfigBean sessionConfig;
    //url内存队列
    protected UrlManager urlManager = new LocalUrlManager();
    //url和htmlparser的关系维护容器
    private ConcurrentHashMap<String, UrlParserPair> urlParserMap = new ConcurrentHashMap();
    //父容器
    private CrawlerContext container;
    //会话ID
    private String sessionId;
    //Disruptor上下文
    private DisruptorContext disruptorContext;
    //运行模式
    private int runmode;

    /***
     * 构造函数
     * @param container 父容器
     */
    public DistributedCrawlerSession(CrawlerContext container, CrawlerXmlConfigBean crawlerXmlConfigBean) {
        this.container = container;
        lifeCycle = LifeCycle.INITIALIZED;
        disruptorContext = DisruptorContext.getInstance(this);
        this.sessionConfig = crawlerXmlConfigBean;
        this.runmode = crawlerXmlConfigBean.getRunmode();
    }

    public DistributedCrawlerSession(CrawlerContext container, CrawlerXmlConfigBean crawlerXmlConfigBean, UrlManager urlManager) {
        this.container = container;
        lifeCycle = LifeCycle.INITIALIZED;
        this.urlManager = urlManager;
        disruptorContext = DisruptorContext.getInstance(this);
        this.sessionConfig = crawlerXmlConfigBean;
        this.runmode = crawlerXmlConfigBean.getRunmode();
    }

    public UrlParserPair fetchUrl() {
        UrlParserPair urlParserPair = urlManager.fetchUrl();
        if (urlManager.isQueueEmpty()) {
            lifeCycle = LifeCycle.FREE;
        }
        return urlParserPair;
    }

    public void parse(HtmltextProtocol htmltextProtocol) throws MalformedURLException {
        UrlParserPair urlParserPair = urlParserMap.get(htmltextProtocol.getUrl());
        CrawlerHttpRequest request = CrawlerHttpReqFactory.getRequest(urlParserPair.toURL(), urlParserPair.getHtmlParser());
        request.setSessionId(sessionId);
        lifeCycle = LifeCycle.RUNNING;
        submitTask(request);
    }

    @Override
    public String getSessionId() {
        return generator.generate();
    }

    public void pause() {
        lifeCycle = LifeCycle.PAUSED;
    }

    public void destory() {
        lifeCycle = LifeCycle.DESTORYED;
        container.removeSession(sessionId);
        container.checkStatus();
        container = null;
    }

    public boolean isPaused() {
        return lifeCycle == LifeCycle.PAUSED;
    }

    public boolean isDestoryed() {
        return lifeCycle == LifeCycle.DESTORYED;
    }

    public DisruptorContext getDisruptorContext() {
        return disruptorContext;
    }

    public void submitTask(CrawlerHttpRequest request) {
        threadPool.submit(new TaskWorker(request, this));
    }

    public CrawlerXmlConfigBean getSessionConfig() {
        return this.sessionConfig;
    }

    public int getRunmode() {
        return runmode;
    }

    public void setRunmode(int runmode) {
        this.runmode = runmode;
    }

    public void pushUrl(UrlParserPair urlParserPair) {
        urlParserMap.put(urlParserPair.getUrl(), urlParserPair);
        urlManager.pushUrl(urlParserPair);
    }

    public void ifNeedClose() {
        if (isPaused()) {
            container.interruptSession(sessionId);
        }
    }

    private enum RUN_MODE {
        NON_STOP(1), STATUS_MONITOR(2), RUN_UNTIL_EMPTY(3);
        private int mode;

        private RUN_MODE(int mode) {
            this.mode = mode;
        }

        public static RUN_MODE parseMode(int mode) {
            switch (mode) {
                case 3:
                    return RUN_UNTIL_EMPTY;
                case 2:
                    return STATUS_MONITOR;
                default:
                    return NON_STOP;
            }
        }

        public int getMode() {
            return mode;
        }
    }
}
